ZMQ producer/consumer state table design
Table of Contents
-
-
1.3 ZMQ consumer state table support async operation(#2-configuration-and-management-requirements)
1 Functional Requirement
1.1 ZMQ client supported operations
-
ZmqClient will send message to ZMQ.
-
ZmqClient can be reused by multiple ZmqProducerStateTable instance.
-
ZmqClient sendMsg() method is thread safe async method, will return immediately, ZMQ lib support async operation.
-
ZmqClient will retry when send not success:
-
When ZMQ socket connection broken, send API will failed and need re-connect and send again.
-
When ZMQ send queue is full, send API will failed and need retry later.
-
When a signal come, ZMQ send API will failed, need retry again.
-
-
ZmqClient will throw exception after retry failed.
-
ZmqClient will throw exception when ZMQ connection break.
1.2 ZMQ server supported operations
-
ZmqServer will start a receive thread and receive message from ZMQ.
-
ZmqServer can be reused by multiple ZmqConsumerStateTable instance.
-
When ZmqServer receive message from ZMQ, ZmqServer will:
-
De-serialize received message.
-
Find ZmqMessageHandler by message content.
-
Dispatch message to ZmqMessageHandler.
-
1.3 ZMQ producer state table supported operations
-
Producer table will use ZmqClient to send message.
-
Should support following operations.
-
Set: void set(const std::string &key, const std::vector &values, const std::string &op = SET_COMMAND, const std::string &prefix = EMPTY_PREFIX)
-
Delete: void del(const std::string &key, const std::string &op = DEL_COMMAND, const std::string &prefix = EMPTY_PREFIX)
-
Batch Set: void set(const std::vector& values)
-
Batch Delete: void del(const std::vectorstd::string& keys)
-
1.3 ZMQ consumer state table supported operations
-
Consumer implement ZmqMessageHandler interface.
-
When consumer table receive message from ZmqServer, consumer table will:
-
Send notification to select to handle received operation.
-
Send notification to DB update thread for write received operation to database. This is a configurable feature, could turn on/off this feature in use cases requiring less memory consumption or higher performance.
-
After send notification, continue receive next message from ZMQ.
-
2 Design
-
Diagram:
-
Sequence:
-
Call ZmqProducerStateTable API.
-
ZmqProducerStateTable will send message with ZmqClient.
-
ZmqClient will serialize operation and send to ZMQ.
-
ZmqClient will add database name and table name to message for ZmqServer side message dispatch.
-
Return when send success.
-
Retry when send failed.
-
Throw exception when retry failed.
-
-
ZmqServer Side:
-
m_mqPollThread:
-
Receive message from ZMQ.
-
De-serialize received message then dispatch message to ZmqConsumerStateTable.
-
ZmqServer manage a database name & table name to ZmqConsumerStateTable mapping. ZmqConsumerStateTable will register itself to this mapping.
-
ZmqServer will find ZmqConsumerStateTable by database name and table name.
-
-
Continue receive next message.
-
-
ZmqConsumerStateTable Side:
-
Receive message from ZmqServer then:
-
Notify select event
-
Notify m_dbUpdateThread
-
-
m_dbUpdateThread:
- Update data from m_DbUpdateDataQueue to Redis database.
-
-
Select will pop operations from m_receivedQueue with ZmqConsumerStateTable::pops().